/*
 * Unidata Platform Community Edition
 * Copyright (c) 2013-2020, UNIDATA LLC, All rights reserved.
 * This file is part of the Unidata Platform Community Edition software.
 *
 * Unidata Platform Community Edition is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * Unidata Platform Community Edition is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program. If not, see <https://www.gnu.org/licenses/>.
 */

package org.unidata.mdm.search.service.impl;

import java.io.IOException;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequestBuilder;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequestBuilder;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.mapper.BooleanFieldMapper;
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.index.mapper.KeywordFieldMapper;
import org.elasticsearch.index.mapper.TextFieldMapper;
import org.elasticsearch.join.mapper.ParentJoinFieldMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.unidata.mdm.search.configuration.SearchConfigurationConstants;
import org.unidata.mdm.search.context.MappingRequestContext;
import org.unidata.mdm.search.context.TypedSearchContext;
import org.unidata.mdm.search.exception.SearchExceptionIds;
import org.unidata.mdm.search.type.FieldType;
import org.unidata.mdm.search.type.HierarchicalIndexType;
import org.unidata.mdm.search.type.HierarchicalTopIndexType;
import org.unidata.mdm.search.type.IndexField;
import org.unidata.mdm.search.type.IndexType;
import org.unidata.mdm.search.type.mapping.Mapping;
import org.unidata.mdm.search.type.mapping.MappingField;
import org.unidata.mdm.search.type.mapping.impl.AbstractTemporalMappingField;
import org.unidata.mdm.search.type.mapping.impl.AbstractValueMappingField;
import org.unidata.mdm.search.type.mapping.impl.CompositeMappingField;
import org.unidata.mdm.search.type.mapping.impl.StringMappingField;
import org.unidata.mdm.search.util.SearchUtils;
import org.unidata.mdm.system.exception.PlatformFailureException;
import org.unidata.mdm.system.type.annotation.ConfigurationRef;
import org.unidata.mdm.system.type.configuration.ConfigurationValue;

/**
 * @author Mikhail Mikhailov on Oct 8, 2019
 */
@Component
public class MappingComponentImpl extends BaseComponentImpl {
    /**
     * Logger.
     */
    private static final Logger LOGGER = LoggerFactory.getLogger(MappingComponentImpl.class);

    /**
     * Default type settings.
     */
    private static final Settings DEFAULT_INDEX_SETTINGS = Settings.builder()
            .put("analysis.analyzer." + SearchUtils.DEFAULT_STRING_ANALYZER_NAME + ".type", "custom")
            .put("analysis.analyzer." + SearchUtils.DEFAULT_STRING_ANALYZER_NAME + ".tokenizer", "standard")
            .putList("analysis.analyzer." + SearchUtils.DEFAULT_STRING_ANALYZER_NAME + ".filter",
                      "lowercase",
                      "autocomplete_filter")
            .put("analysis.filter.autocomplete_filter.type", "edge_ngram")
            .put("analysis.filter.autocomplete_filter.min_gram", "1")
            .put("analysis.filter.autocomplete_filter.max_gram", "55")
            .build();

    private static final Settings LOWERCASE_NORMALIZER_INDEX_SETTINGS = Settings.builder()
            .put("analysis.normalizer." + SearchUtils.LOWERCASE_STRING_NORMALIZER_NAME  +  ".type", "custom")
            .putList("analysis.normalizer." + SearchUtils.LOWERCASE_STRING_NORMALIZER_NAME + ".filter",
                      "lowercase")
            .build();

    private static final Settings SEARCH_INDEX_SETTINGS = Settings.builder()
        .put("analysis.analyzer." + SearchUtils.SEARCH_STRING_ANALYZER_NAME + ".type", "custom")
        .put("analysis.analyzer." + SearchUtils.SEARCH_STRING_ANALYZER_NAME + ".tokenizer", "standard")
        .putList("analysis.analyzer." + SearchUtils.SEARCH_STRING_ANALYZER_NAME + ".filter",
            "lowercase")
        .build();

    private static final Settings UNIDATA_CUSTOM_SIMILARITY_SETTINGS = Settings.builder()
        .put("similarity." + SearchUtils.UNIDATA_CUSTOM_SIMILARITY  +  ".type", "BM25")
        .build();
    /**
     * Morpho russian + english.
     */
    private static final Settings MORPHOLOGICAL_INDEX_SETTINGS = Settings.builder()
            .put("analysis.analyzer." + SearchUtils.MORPH_STRING_ANALYZER_NAME + ".type", "custom")
            .put("analysis.analyzer." + SearchUtils.MORPH_STRING_ANALYZER_NAME + ".tokenizer", "standard")
            .putList("analysis.analyzer." + SearchUtils.MORPH_STRING_ANALYZER_NAME + ".filter",
                      "lowercase",
                      "hunspell_ru_RU",
                      "hunspell_en_US"
                      )
            // english hunspell dictionary
            .put("analysis.filter.hunspell_en_US.type", "hunspell")
            .put("analysis.filter.hunspell_en_US.locale", "en_US")
            .put("analysis.filter.hunspell_en_US.dictionary", "en_US")
            .put("analysis.filter.hunspell_en_US.dedup", false)
            // russian hunspell dictionary
            .put("analysis.filter.hunspell_ru_RU.type", "hunspell")
            .put("analysis.filter.hunspell_ru_RU.locale", "ru_RU")
            .put("analysis.filter.hunspell_ru_RU.dictionary", "ru_RU")
            .put("analysis.filter.hunspell_ru_RU.dedup", false)
            .build();
    /**
     * Number of shards
     */
    @ConfigurationRef(SearchConfigurationConstants.PROPERTY_SHARDS_NUMBER)
    private ConfigurationValue<Long> numberOfShards;
    /**
     * Number of replicas
     */
    @ConfigurationRef(SearchConfigurationConstants.PROPERTY_REPLICAS_NUMBER)
    private ConfigurationValue<Long> numberOfReplicas;
    /**
     * Number of fields per index.
     */
    @ConfigurationRef(SearchConfigurationConstants.PROPERTY_FIELDS_LIMIT)
    private ConfigurationValue<Long> numberOfFields;
    /**
     * Admin action timeout.
     */
    @ConfigurationRef(SearchConfigurationConstants.PROPERTY_ADMIN_ACTION_TIMEOUT)
    private ConfigurationValue<Long> adminActionTimeout;
    /**
     * Transport client to use.
     */
    @Autowired
    private Client client;
    /**
     * Constructor.
     */
    public MappingComponentImpl() {
        super();
    }
    /**
     * Does processing of the mapping process, which may be:
     * <ul>
     * <li>- create index</li>
     * <li>- drop index</li>
     * <li>- put / merge mapping</li>
     * </ul>
     * @param ctx the context
     * @return true, if successful, false otherwise
     */
    public boolean process(@Nonnull MappingRequestContext ctx) {

        // Drop and exit
        if (ctx.drop()) {
            return dropIndex(ctx);
        }

        boolean exists = indexExists(ctx);
        if (exists && ctx.forceCreate()) {
            closeIndex(ctx);
            dropIndex(ctx);
            exists = false;
        }

        if (!exists) {
            createIndex(ctx, initIndexProperties(ctx));
        }

        for (Mapping m : ctx.getMappings()) {
            createMapping(ctx, m);
        }

        return true;
    }

    private Properties initIndexProperties(MappingRequestContext ctx) {

        Properties supplied = new Properties();
        if (ctx.getShards() > 0) {
            supplied.setProperty(SearchUtils.ES_NUMBER_OF_SHARDS_SETTING, Integer.toString(ctx.getShards()));
        }

        if (ctx.getReplicas() > 0) {
            supplied.setProperty(SearchUtils.ES_NUMBER_OF_REPLICAS_SETTING, Integer.toString(ctx.getReplicas()));
        }

        if (ctx.getFields() > 0) {
            supplied.setProperty(SearchUtils.ES_LIMIT_OF_TOTAL_FIELDS, Integer.toString(ctx.getFields()));
        }

        if (ctx.whitespace()) {
            supplied.put("analysis.analyzer." + SearchUtils.DEFAULT_STRING_ANALYZER_NAME + ".tokenizer", "whitespace");
            supplied.put("analysis.analyzer." + SearchUtils.MORPH_STRING_ANALYZER_NAME + ".tokenizer", "whitespace");
            supplied.put("analysis.analyzer." + SearchUtils.SEARCH_STRING_ANALYZER_NAME + ".tokenizer", "whitespace");
        }

        processSimilarityConfiguration(ctx, supplied);

        return supplied;
    }

    private void processSimilarityConfiguration(@Nonnull MappingRequestContext ctx, Properties indexProperties) {
        if (StringUtils.isNotEmpty(ctx.scoreStrategy())) {
            indexProperties.put("similarity." + SearchUtils.UNIDATA_CUSTOM_SIMILARITY + ".type", ctx.scoreStrategy());
        }
        if (MapUtils.isNotEmpty(ctx.scoreStrategyParams())) {
            ctx.scoreStrategyParams().forEach((paramName, paramValue) ->
                indexProperties.put("similarity." + SearchUtils.UNIDATA_CUSTOM_SIMILARITY + "." + paramName, paramValue));
        }
    }

    private Settings verifyIndexProperties(Properties specialProperties) {

        Settings.Builder result = Settings.builder();

        // This is block with default properties.
        result.put(SearchUtils.ES_NUMBER_OF_SHARDS_SETTING, numberOfShards.getValue().toString());
        result.put(SearchUtils.ES_NUMBER_OF_REPLICAS_SETTING, numberOfReplicas.getValue().toString());
        result.put(SearchUtils.ES_LIMIT_OF_TOTAL_FIELDS, numberOfFields.getValue().toString());
        result.put(SearchUtils.ES_MAX_RESULT_WINDOW, maxWindowSize.getValue().toString());

        if (specialProperties != null) {
            specialProperties.forEach((key, value) -> result.put(key.toString(), value.toString()));
        }

        return result.build();
    }

    private boolean createIndex(@Nonnull final TypedSearchContext ctx, @Nullable Properties properties) {

        // 1. Compose the name of the type
        final String indexName = constructIndexName(ctx);

        // 2. Delete if requested
        dropIndex(ctx);

        // 3. Put a new one
        Settings settings = null;
        try {

            settings = Settings.builder()
                    .put(DEFAULT_INDEX_SETTINGS)
                    .put(MORPHOLOGICAL_INDEX_SETTINGS)
                    .put(SEARCH_INDEX_SETTINGS)
                    .put(LOWERCASE_NORMALIZER_INDEX_SETTINGS)
                    .put(UNIDATA_CUSTOM_SIMILARITY_SETTINGS)
                    .put(verifyIndexProperties(properties))
                    .build();

        } catch (Exception t) {
            LOGGER.error("Settings builder failed. {}", t);
            return false;
        }

        boolean result = client.admin()
                .indices()
                .create(new CreateIndexRequest(indexName, settings))
                .actionGet(adminActionTimeout.getValue())
                .isAcknowledged();

        if (!result) {
            LOGGER.error("Failed to create index '{}'.", indexName);
        }

        return result;
    }

    private boolean createMapping(MappingRequestContext ctx, Mapping mapping) {

        try (XContentBuilder builder = XContentFactory.jsonBuilder()) {

            builder
                .startObject()
                    .field("dynamic", false)
                    .startObject("properties");

            createHierarchicalMapping(mapping.getIndexType(), builder);

            for (MappingField field : mapping.getFields()) {

                if (field.getFieldType() == FieldType.COMPOSITE) {
                    createCompositeFieldMapping((CompositeMappingField) field, builder);
                } else {
                    createValueFieldMapping((AbstractValueMappingField<?>) field, builder);
                }
            }

            builder
                    .endObject()
                .endObject();

            final String indexName = constructIndexName(ctx);
            PutMappingRequestBuilder request = client.admin()
                    .indices()
                    .preparePutMapping(indexName)
                    .setType(SearchConfigurationConstants.DEFAULT_INDEX_TYPE)
                    .setSource(builder);

            AcknowledgedResponse response  = executeRequest(request);
            return response.isAcknowledged();

        } catch (Exception e) {
            final String message = "XContentBuilder threw an exception. {}.";
            throw new PlatformFailureException(message, e, SearchExceptionIds.EX_SEARCH_MAPPING_IO_FAILURE);
        }
    }

    private void createHierarchicalMapping(IndexType type, XContentBuilder builder) throws IOException {

        if (!type.isHierarchical()) {
            return;
        }

        builder
            .startObject(SearchUtils.TYPE_FIELD_NAME)
                .field("type", KeywordFieldMapper.CONTENT_TYPE)
                .field("doc_values", true)
            .endObject();

        HierarchicalIndexType hit =  type.toHierarchical();
        HierarchicalTopIndexType top = hit.getTopType();
        IndexField field = hit.getTopType().getJoinField();

        builder
            .startObject(field.getName())
            .field("type", ParentJoinFieldMapper.CONTENT_TYPE)
                .startObject("relations");

        builder.array(top.getName(), top.getChildren().stream().map(HierarchicalIndexType::getName).toArray(String[]::new));
        builder
                .endObject()
            .endObject();
    }

    private void createCompositeFieldMapping(CompositeMappingField field, XContentBuilder builder) throws IOException {

        builder
            .startObject(field.getName());

        if (field.isNested()) {
            builder.field("type", "nested");
        }

        builder.startObject("properties");

        for (MappingField f : field.getFields()) {

            if (f.getFieldType() == FieldType.COMPOSITE) {
                createCompositeFieldMapping((CompositeMappingField) f, builder);
            } else {
                createValueFieldMapping((AbstractValueMappingField<?>) f, builder);
            }
        }

        builder
                .endObject()
            .endObject();
    }

    private void createValueFieldMapping(AbstractValueMappingField<?> field, XContentBuilder builder) throws IOException {

        builder
            .startObject(field.getName());

        switch (field.getFieldType()) {
        case BOOLEAN:
            builder.field("type", BooleanFieldMapper.CONTENT_TYPE);
            break;
        case DATE:
        case TIME:
        case TIMESTAMP:
            builder.field("type", DateFieldMapper.CONTENT_TYPE);

            AbstractTemporalMappingField<?> temporalField = (AbstractTemporalMappingField<?>) field;
            if (StringUtils.isNotBlank(temporalField.getFormat())) {
                builder.field("format", temporalField.getFormat());
            } else if (temporalField.getFieldType() == FieldType.TIME) {
                builder.field("format", "'T'HH:mm:ss||'T'HH:mm:ss.SSS||HH:mm:ss||HH:mm:ss.SSS||'T'HH:mm:ssZZ||'T'HH:mm:ss.SSSZZ||HH:mm:ssZZ||HH:mm:ss.SSSZZ");
            }

            break;
        case NUMBER:
            builder.field("type", "double");
            break;
        case INTEGER:
            builder.field("type", "long");
            break;
        case STRING:

            StringMappingField stringField = (StringMappingField) field;
            if (stringField.isAnalyzed()) {

                builder.field("type", TextFieldMapper.CONTENT_TYPE);
                builder.field("analyzer", SearchUtils.DEFAULT_STRING_ANALYZER_NAME);

                // Add not analyzed counter part for term matches
                builder
                    .startObject("fields")
                        .startObject(SearchUtils.NAN_FIELD)
                            .field("type", KeywordFieldMapper.CONTENT_TYPE);

                if (stringField.isCaseInsensitive()) {
                    builder.field("normalizer", SearchUtils.LOWERCASE_STRING_NORMALIZER_NAME);
                }

                builder.endObject(); // $nan

                builder.startObject(SearchUtils.DEFAULT_SEARCH_ANALIZED_FIELD)
                    .field("type", TextFieldMapper.CONTENT_TYPE)
                    .field("analyzer", SearchUtils.SEARCH_STRING_ANALYZER_NAME)
                    .endObject();

                if (stringField.isMorphologicalAnalysis()) {
                    builder
                        .startObject(SearchUtils.MORPH_FIELD)
                            .field("type", TextFieldMapper.CONTENT_TYPE)
                            .field("analyzer", SearchUtils.MORPH_STRING_ANALYZER_NAME)
                        .endObject();
                }

                builder.endObject(); // fields
            } else {

                builder.field("type", KeywordFieldMapper.CONTENT_TYPE);
            }

            break;
        default:
            final String message = "Cannot map attribute '{}'. Type '{}' is not recognized or misplaced!";
            LOGGER.warn(message, field.getName(), field.getFieldType());
            throw new PlatformFailureException(message, SearchExceptionIds.EX_SEARCH_MAPPING_TYPE_UNKNOWN,
                    field.getName(), field.getFieldType().name());
        }

        createValueFieldCommonMapping(field, builder);

        builder
            .endObject();
    }

    private void createValueFieldCommonMapping(AbstractValueMappingField<?> field, XContentBuilder builder) throws IOException {

        if (Objects.nonNull(field.getDefaultValue())) {
            builder.field("null_value", field.getDefaultValue());
        }

        if (!field.isIndexed()) {
            builder.field("index", false);
        }

        if (field.isDocValue()) {
            builder.field("doc_values", true);
        }
    }
    /**
     * Drops an index.
     *
     * @param ctx the context to use
     * @return true, if successful, false otherwise
     */
    public boolean dropIndex(@Nonnull final TypedSearchContext ctx) {

        // 1. Compose the name of the type
        final String indexName = constructIndexName(ctx);

        // 2. Check exists
        boolean exists = indexExists(ctx);

        // 3. Delete
        boolean deleteSuccess = false;
        if (exists) {

            deleteSuccess = client.admin()
                    .indices()
                    .delete(new DeleteIndexRequest(indexName))
                    .actionGet(adminActionTimeout.getValue())
                    .isAcknowledged();

            if (!deleteSuccess) {
                LOGGER.error("Failed to delete index '{}'. Returning.", indexName);
                return false;
            }
        } else {
            return true;
        }

        try {
            Thread.sleep(500);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            LOGGER.error("Drop index exception", e);
        }

        return deleteSuccess;
    }

    /**
     * Tells if an index exists.
     *
     * @param ctx the context
     * @return true, if exists, false otherwise
     */
    public boolean indexExists(@Nonnull final TypedSearchContext ctx) {

        // 1. Compose the name of the type
        final String indexName = constructIndexName(ctx);

        // 2. Check index exists
        return client.admin()
                .indices()
                .exists(new IndicesExistsRequest(indexName))
                .actionGet(adminActionTimeout.getValue())
                .isExists();
    }

    /**
     * {@inheritDoc}
     */
    public boolean refreshIndex(TypedSearchContext ctx, boolean wait) {

        // 1. Compose the name of the type
        final String indexName = constructIndexName(ctx);

        // 2. Check index exists
        if (wait) {
            return client.admin()
                    .indices()
                    .refresh(new RefreshRequest(indexName))
                    .actionGet()
                    .getFailedShards() == 0;
        }

        return client.admin()
                .indices()
                .refresh(new RefreshRequest(indexName))
                .actionGet(adminActionTimeout.getValue())
                .getFailedShards() == 0;
    }

    /**
     * {@inheritDoc}
     */
    public boolean closeIndex(TypedSearchContext ctx) {

        // 1. Compose the name of the type
        final String indexName = constructIndexName(ctx);

        // 2. Close the index
        return client.admin()
                .indices()
                .prepareClose(indexName)
                .execute()
                .actionGet(adminActionTimeout.getValue())
                .isAcknowledged();
    }

    /**
     * {@inheritDoc}
     */
    public boolean openIndex(TypedSearchContext ctx) {

        // 1. Compose the name of the type
        final String indexName = constructIndexName(ctx);

        // 2. Open the index
        return client.admin()
                .indices()
                .prepareOpen(indexName)
                .execute()
                .actionGet(adminActionTimeout.getValue())
                .isAcknowledged();
    }

    /**
     * {@inheritDoc}
     */
    public boolean setIndexSettings(TypedSearchContext ctx, Map<String, Object> settings) {

        // 1. Compose the name of the type
        final String indexName = constructIndexName(ctx);

        // 2. Set refresh interval
        return client.admin()
                .indices()
                .prepareUpdateSettings(indexName)
                .setSettings(settings)
                .execute()
                .actionGet(adminActionTimeout.getValue())
                .isAcknowledged();
    }

    /**
     * {@inheritDoc}
     */
    public boolean setClusterSettings(Map<String, Object> settings, boolean persistent) {

        // 1. Set settings
        ClusterUpdateSettingsRequestBuilder b = client.admin()
                .cluster()
                .prepareUpdateSettings();

        if (persistent) {
            b.setPersistentSettings(settings);
        } else {
            b.setTransientSettings(settings);
        }

        // 2. Execute
        return b.execute()
                .actionGet(adminActionTimeout.getValue())
                .isAcknowledged();
    }
}
